package org.databandtech.rabbitmq;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.util.Collector;

public class App 
{
    public static void main( String[] args )
    {

    	final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    	
    	final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
    	    .setHost("localhost")
    	    .setPort(5000)
    	    .build();
    	    
    	final DataStream<String> stream = env
    	    .addSource(new RMQSource<String>(
    	        connectionConfig,            // config for the RabbitMQ connection
    	        "queueNameCONSUME",                 // name of the RabbitMQ queue to consume
    	        true,                        // use correlation ids; can be false if only at-least-once is required
    	        new SimpleStringSchema()))   // deserialization schema to turn messages into Java objects
    	    .setParallelism(2);   
    	
		    
		stream.addSink(new RMQSink<String>(
		    connectionConfig,            // config for the RabbitMQ connection
		    "queueNameSINK",                 // name of the RabbitMQ queue to send messages to
		    new SimpleStringSchema()));  
		
		try {
			env.execute("mq-sink");
		} catch (Exception e) {
			e.printStackTrace();
		}
		
    }
}
